-
Notifications
You must be signed in to change notification settings - Fork 4k
distmerge: add merge processor scaffolding #158341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d5946a1 to
17749c1
Compare
jeffswenson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false]; | ||
| // spans are the merge tasks. E.g.the merge task `n` is to merge all the input data that overlaps with | ||
| // [spans[n].Key, spans[n].EndKey) | ||
| repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the formatting is inconsistent in BulkMergeSpec's definition. It seems to be a mix of tabs and spaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/me shakes fist at Claude
ffc184b to
5de1e7f
Compare
spilchen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@spilchen reviewed 8 of 17 files at r1, 2 of 6 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @jeffswenson and @mgartner)
pkg/sql/execinfrapb/processors_bulk_io.proto line 619 at r4 (raw file):
optional string uri = 1 [(gogoproto.nullable) = false]; // start_key is the first key in the SST. optional bytes start_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
I see you changes the start_key/end_key from string to bytes. This was a miss in my earlier PR. There are a bunch of other changes needed for that. See #158290. I think I'll go ahead and merge that to unblock this PR.
5de1e7f to
827768f
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @jeffswenson, @mgartner, @mw5h, and @spilchen)
pkg/sql/execinfrapb/processors_bulk_io.proto line 188 at r7 (raw file):
// MergeLoopback is scheduled on the same node as the MergeCoordinator. // MegeCoordinator is the final processor in the flow and MergeLoopback is the
nit: s/Mege/Merge/.
pkg/sql/execinfrapb/processors_bulk_io.proto line 603 at r7 (raw file):
message BulkMergeSpec { // ssts is the list of input SSTs to merge. repeated SST ssts = 1 [(gogoproto.nullable) = false];
nit: would be nice to give this customname of SSTs. Ditto in output proto below.
pkg/sql/bulkmerge/merge_test.go line 53 at r7 (raw file):
}) sqlReciever := sql.MakeDistSQLReceiver(
nit: s/Reciever/Receiver/.
pkg/sql/physicalplan/routing.go line 44 at r7 (raw file):
startDatum, endDatum := RoutingDatumsForSQLInstance(sqlInstanceID) startBytes, endBytes := make([]byte, 0), make([]byte, 0)
nit: this seems unusual - we can just pass nil as appendTo argument of EncDatum.Encode.
pkg/sql/bulkmerge/merge_planning.go line 23 at r7 (raw file):
// NOTE: This implementation is inspired by the physical plan created by // restore in `pkg/backup/restore_processor_planning.go` planCtx, sqlInstanceIDs, err := execCtx.DistSQLPlanner().SetupAllNodesPlanning(
We'll need to be careful about mixed-version state where the coordinator node is already running CRDB version supporting the bulk merge, but other nodes in the cluster do not yet. We probably just want to add a version gate somewhere, so maybe lets leave a TODO for now.
pkg/sql/bulkmerge/merge_processor.go line 33 at r7 (raw file):
// bulkMergeProcessor accepts rows that include an assigned task id and emits // rows that are (taskID, []ouput_sst) where output_sst is the name of SSTs
nit: s/ouput/output/. Also something is off with "rpo the merged output" below.
220af80 to
7e30e1b
Compare
7e30e1b to
3eda352
Compare
mw5h
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, RFAL
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @mgartner, @spilchen, and @yuzefovich)
pkg/sql/bulkmerge/merge_planning.go line 23 at r7 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
We'll need to be careful about mixed-version state where the coordinator node is already running CRDB version supporting the bulk merge, but other nodes in the cluster do not yet. We probably just want to add a version gate somewhere, so maybe lets leave a TODO for now.
Done.
pkg/sql/execinfrapb/processors_bulk_io.proto line 619 at r4 (raw file):
Previously, spilchen wrote…
I see you changes the start_key/end_key from string to bytes. This was a miss in my earlier PR. There are a bunch of other changes needed for that. See #158290. I think I'll go ahead and merge that to unblock this PR.
Done.
| // output_uri is a URI like 'nodelocal://1/<job_id>/merger'. The uri should | ||
| // use this as the base for all output files. So an output file would be | ||
| // 'nodelocal://1/<job_id>/merger/0.sst' | ||
| optional string output_uri = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "OutputURI"]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: storing the full cloud.cloubpb.ExternalStorage object would be preferable to just a URI string:
optional cloud.cloudpb.ExternalStorage output_store = 3;
With index backfill usage, it cannot use notelocal:// if the backfill is occurring in the secondary tenant. It will probably need to use userfile://. And storing just the URI isn't enough to configure that. The ExternalStorage object has other things needed to set that up (i.e. user and session context), which are missing if it's just a URI string.
This doesn't have to hold up the PR as we can easily migrate to this in the future. It just might save a bit of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
3eda352 to
a3b3b10
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
spilchen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@spilchen reviewed 5 of 17 files at r1, 1 of 6 files at r4, 3 of 9 files at r10, 7 of 7 files at r13, 3 of 3 files at r14, 3 of 3 files at r15, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @mgartner and @yuzefovich)
a3b3b10 to
de6956c
Compare
|
Okay, I'll merge this once @yuzefovich gives it a thumbs up. |
|
bors r+ |
yuzefovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yuzefovich reviewed 2 of 17 files at r1, 1 of 9 files at r10, 1 of 7 files at r16, 3 of 3 files at r17, 3 of 3 files at r18, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @mgartner and @mw5h)
pkg/sql/bulkmerge/merge_processor_test.go line 30 at r18 (raw file):
sqlDB.Exec(t, `CREATE DATABASE test`) // _, err := newBulkMergeProcessor(ctx, nil, 0, execinfrapb.BulkMergeSpec{}, nil, nil)
nit: seems like leftover.
This commit introduces the initial implementation of bulkmerge processors for distributed merge operations. It adds three new processor types: MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL physical planning infrastructure to orchestrate them. The MergeLoopback processor runs on the coordinator node and generates initial tasks. The BulkMerge processors run on each SQL instance to perform merge operations. The MergeCoordinator collects results from all merge processors. This commit also updates the vectorized execution engine (execplan.go) to recognize the new processor cores, preventing panics when these processors are encountered. Informs cockroachdb#156580 Release note: None
Previously, the backup package contained duplicate implementations of routingDatumsForSQLInstance and routingSpanForSQLInstance that were identical to functions recently added to pkg/sql/physicalplan. This commit removes these duplicates and uses the shared implementations: RoutingDatumsForSQLInstance and MakeInstanceRouter. This reduces code duplication and ensures consistent routing behavior across DistSQL physical planning for both backup/restore and bulk merge operations. Informs cockroachdb#156580 Release note: None
Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0. When DefaultDest is nil, any routing key that doesn't match a span will produce an error rather than silently routing to an arbitrary stream. This helps catch coordination bugs early. Updates merge_loopback.go to generate a routing key for its SQL instance using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded "loopback" key. This ensures the routing key matches one of the spans in the router rather than relying on the DefaultDest fallback. The test expectation is updated from "loopback->merge->coordinator" to "node1->merge->coordinator" to reflect the explicit routing behavior. Fixes cockroachdb#156580 Release note: None
de6956c to
d96f819
Compare
|
Canceled. |
|
bors r+ |
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
158341: distmerge: add merge processor scaffolding r=mw5h a=mw5h #### bulkmerge: add processors and DistSQL physical planning This commit introduces the initial implementation of bulkmerge processors for distributed merge operations. It adds three new processor types: MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL physical planning infrastructure to orchestrate them. The MergeLoopback processor runs on the coordinator node and generates initial tasks. The BulkMerge processors run on each SQL instance to perform merge operations. The MergeCoordinator collects results from all merge processors. This commit also updates the vectorized execution engine (execplan.go) to recognize the new processor cores, preventing panics when these processors are encountered. Informs #156580 Release note: None #### backup: deduplicate routing logic with physicalplan package Previously, the backup package contained duplicate implementations of routingDatumsForSQLInstance and routingSpanForSQLInstance that were identical to functions recently added to pkg/sql/physicalplan. This commit removes these duplicates and uses the shared implementations: RoutingDatumsForSQLInstance and MakeInstanceRouter. This reduces code duplication and ensures consistent routing behavior across DistSQL physical planning for both backup/restore and bulk merge operations. Informs #156580 Release note: None #### physicalplan: set DefaultDest to nil to catch routing bugs Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0. When DefaultDest is nil, any routing key that doesn't match a span will produce an error rather than silently routing to an arbitrary stream. This helps catch coordination bugs early. Updates merge_loopback.go to generate a routing key for its SQL instance using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded "loopback" key. This ensures the routing key matches one of the spans in the router rather than relying on the DefaultDest fallback. The test expectation is updated from "loopback->merge->coordinator" to "node1->merge->coordinator" to reflect the explicit routing behavior. Fixes #156580 Release note: None 158398: ui, sql: Record KV CPU Time in statement and transaction statistics r=alyshanjahani-crl a=alyshanjahani-crl This commit starts recording the newly added kvCPUTime field from topLevelQueryStats and exposes it in the UI. Fixes: https://cockroachlabs.atlassian.net/browse/CRDB-57265 Release note (ui change): KV CPU Time is now recorded to statement_statistics and transaction_statistics and is displayed in the SQL Activity page. 158538: mmaprototype: add more logging to candidate selection decisions r=angeladietz a=angeladietz This adds more logging to help understand decisions MMA is making around when a lease/replica can be moved to another store. Informs #158203 Epic: CRDB-55052 Release note: None 158595: ui: fix double dispatch when metric time is selected r=dhartunian a=dhartunian Previously, when a new time window was selected in the time picker on the metrics page, we would dispatch a `ts/query` request twice because the first request would trigger a `useEffect` callback that triggered a second one. This was because of a desire to keep query params in the URL bar in sync with the time picker. We split up the query param update with the query param + redux update into two functions so that we can sync the params withou touching redux. This eliminates the double request cycle since redux is now updated once. Resolves: #158507 Epic: None Release note (bug fix): the DB Console no longer dispatches duplicate metric query requests when new time windows were selected on the metrics page. 158596: kvnemesis: correctly verify the span config of the system range r=miraradeva a=miraradeva Previously, kvnemesis verified the correct span config for some critical ranges: meta (r1), liveness (r2) and system (r3). However, as of 8844739, meta1 and meta2 start off as separate ranges, pushing the system range to r4. It's important for this range to be replicated correctly in kvnemesis to ensure it's available in the presence of network partitions. Unavailability of the system range can result in splits not able to allocate new range IDs. This commit bumps up the max range ID used for span config verification from 3 to 4. Informs: #158366 Release note: None 158606: mma: always incorporate RangeMsg.RangeLoad r=wenyihu6 a=sumeerbhola Epic: CRDB-55052 Release note: None 158610: mma: tweak the comment of MakeStoreLoadMsg r=wenyihu6 a=sumeerbhola This is mainly to clarify that we sum the load and capacity reported per store inside MMA, so tweaking the capacity calculation approach may not be possible in isolation. Epic: CRDB-55052 Release note: None 158611: sql: increase TestScatterWithOneVoter observability r=iskettaneh a=iskettaneh This commit traces the scatter statement, and adds some logging to help investigate future failures of this test. Fixes: #158491 Release note: None Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: Matt White <[email protected]> Co-authored-by: alyshanjahani-crl <[email protected]> Co-authored-by: Angela Dietz <[email protected]> Co-authored-by: David Hartunian <[email protected]> Co-authored-by: Mira Radeva <[email protected]> Co-authored-by: sumeerbhola <[email protected]> Co-authored-by: iskettaneh <[email protected]>
|
Build failed (retrying...): |
|
Build succeeded: |
bulkmerge: add processors and DistSQL physical planning
This commit introduces the initial implementation of bulkmerge processors
for distributed merge operations. It adds three new processor types:
MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL
physical planning infrastructure to orchestrate them.
The MergeLoopback processor runs on the coordinator node and generates
initial tasks. The BulkMerge processors run on each SQL instance to
perform merge operations. The MergeCoordinator collects results from
all merge processors.
This commit also updates the vectorized execution engine (execplan.go)
to recognize the new processor cores, preventing panics when these
processors are encountered.
Informs #156580
Release note: None
backup: deduplicate routing logic with physicalplan package
Previously, the backup package contained duplicate implementations of
routingDatumsForSQLInstance and routingSpanForSQLInstance that were
identical to functions recently added to pkg/sql/physicalplan. This
commit removes these duplicates and uses the shared implementations:
RoutingDatumsForSQLInstance and MakeInstanceRouter.
This reduces code duplication and ensures consistent routing behavior
across DistSQL physical planning for both backup/restore and bulk merge
operations.
Informs #156580
Release note: None
physicalplan: set DefaultDest to nil to catch routing bugs
Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0.
When DefaultDest is nil, any routing key that doesn't match a span will
produce an error rather than silently routing to an arbitrary stream.
This helps catch coordination bugs early.
Updates merge_loopback.go to generate a routing key for its SQL instance
using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded
"loopback" key. This ensures the routing key matches one of the spans in
the router rather than relying on the DefaultDest fallback.
The test expectation is updated from "loopback->merge->coordinator" to
"node1->merge->coordinator" to reflect the explicit routing behavior.
Fixes #156580
Release note: None